package cn.doitedu.rtmk.utils;

import cn.doitedu.common.pojo.EventBean;
import cn.doitedu.rtmk.functions.RowToRuleMetaBean;
import cn.doitedu.rtmk.pojo.RuleMetaBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class DataStreamUtil {

    public static  DataStream<EventBean> getUserEventStream(StreamExecutionEnvironment env){

        DataStreamSource<String> ds = env.socketTextStream("doitedu", 9099);

        SingleOutputStreamOperator<EventBean> beans = ds.map(new MapFunction<String, EventBean>() {
            @Override
            public EventBean map(String value) throws Exception {
                return JSON.parseObject(value, EventBean.class);
            }
        });

        return beans;
    }


    public static DataStream<RuleMetaBean> getRuleMetaDataStream(StreamTableEnvironment tEnv ){

        tEnv.executeSql("CREATE TABLE rule_meta_cdc (    " +
                " `rule_id` STRING   PRIMARY KEY NOT ENFORCED,      " +
                " `rule_model_id` STRING   ,                   " +
                " `rule_param_json` string,                 " +
                " `static_profile_crowed_bm` binary  ,      " +
                " `rule_caculator_groovy_code` string,      " +
                " `rule_status` int                         " +
                "  ) WITH (                                 " +
                "     'connector' = 'mysql-cdc',            " +
                "     'hostname' = 'doitedu'   ,            " +
                "     'port' = '3306'          ,            " +
                "     'username' = 'root'      ,            " +
                "     'password' = 'root'      ,            " +
                "     'database-name' = 'doti34',           " +
                "     'table-name' = 'rule_instance_resource' " +
                ")");

        Table table = tEnv.from("rule_meta_cdc");
        DataStream<Row> rowDataStream = tEnv.toChangelogStream(table);

        return rowDataStream.map(new RowToRuleMetaBean());
    }

}
